package drds.plus.api;

import drds.plus.common.jdbc.Parameters;
import drds.plus.common.jdbc.transaction_policy.ITransactionPolicy;
import drds.plus.common.jdbc.transaction_policy.TransactionType;
import drds.plus.common.model.SqlType;
import drds.plus.common.model.ThreadLocalString;
import drds.plus.common.model.hint.RouteCondition;
import drds.plus.common.model.hint.RouteType;
import drds.plus.common.thread_local.ThreadLocalMap;
import drds.plus.datanode.api.DataNodeHintParser;
import drds.plus.datanode.api.ThreadLocalDataSourceIndex;
import drds.plus.executor.ExecuteContext;
import drds.plus.executor.Executor;
import drds.plus.executor.cursor.cursor.impl.result_cursor.ResultCursor;
import drds.plus.executor.data_node_executor.DataNodeExecutorContext;
import drds.plus.executor.transaction.ConnectionHolder;
import drds.plus.executor.transaction.Transaction;
import drds.plus.executor.transaction.cobar_style.CobarStyleTransaction;
import drds.plus.executor.transaction.non_transaction_and_close_connection_when_commit.NonTransactionAndCloseConnectionWhenCommitTransaction;
import drds.plus.executor.transaction.strict.StrictTransaction;
import drds.plus.executor.transaction.strict_write_with_non_transaction_cross_database_read.StrictWriteWithNonTransactionCrossDatabaseReadTransaction;
import drds.plus.sql_process.optimizer.OptimizerContext;
import drds.plus.sql_process.parser.ParseInfo;
import drds.tools.$;
import drds.tools.ShouldNeverHappenException;
import lombok.extern.slf4j.Slf4j;

import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ExecutorService;

@Slf4j
public class SessionImpl implements Session {

    private final SessionManager dataSource;
    private final List<Execute> executeList = Collections.synchronizedList(new ArrayList<Execute>(2));
    private final ExecutorService executorService;
    private Executor executor = null;
    private ExecuteContext executeContext = new ExecuteContext(); // 记录上一次的执行上下文
    private boolean isAutoCommit = true; // jdbc规范，新连接为true
    private boolean closed;
    private ITransactionPolicy transactionPolicy = ITransactionPolicy.strict_write_with_non_transaction_cross_database_read;
    private TransactionType transactionType = TransactionType.strict;
    /**
     * 管理这个连接下用到的所有物理连接
     */
    private long lastInsertId;

    private String sqlMode = null;
    private List<Long> generatedKeys;
    private Transaction transaction;

    public SessionImpl(SessionManager dataSource) {
        this.dataSource = dataSource;
        this.executor = dataSource.getExecutor();
        this.executorService = dataSource.borrowExecutorService();
    }

    protected boolean isWrite(String sql) {
        ParseInfo parseInfo = OptimizerContext.getOptimizerContext().getSqlParseManager().parse(sql, true);
        SqlType sqlType = parseInfo.getSqlType();
        switch (sqlType) {
            case INSERT:
            case UPDATE:
            case REPLACE:
            case DELETE:
                return true;
            default:
                return false;
        }
    }

    /**
     * 执行sql语句的逻辑
     */
    public ResultCursor executeSQL(ExecuteContext executeContext, String sql, Parameters parameters, Execute execute, Map<String, Object> extraCmd) throws SQLException {
        this.transactionType = this.transactionPolicy.getTransactionPolicyType(isAutoCommit);
        if (this.transaction == null || this.transaction.isClosed()) {
            beginTransaction();
        }

        try {

            DataNodeExecutorContext.setExecutorContext(this.dataSource.getConfigHolder().getDataNodeExecutorContext());
            OptimizerContext.setOptimizerContext(this.dataSource.getConfigHolder().getOptimizerContext());

            ResultCursor resultCursor;

            extraCmd.putAll(buildExtraCommand(sql));
            // 处理下group directlyRouteCondition
            String groupHint = DataNodeHintParser.extractDataNodeIdHint(sql);
            if ($.isNotNullAndNotEmpty(groupHint)) {
                sql = DataNodeHintParser.removeDataNodeIdHint(sql);
                executeContext.setGroupHint(DataNodeHintParser.buildDataNodeIdHint(groupHint));
            } else {
                executeContext.setGroupHint(null);
            }
            executeContext.setExecutorService(executorService);
            executeContext.setParameters(parameters);
            executeContext.setSql(sql);
            executeContext.setExtraCmds(extraCmd);
            executeContext.setTransaction(transaction);

            executeContext.setSqlMode(sqlMode);

            executeContext.setSession(this);
            try {
                resultCursor = executor.execute(executeContext, sql);
            } catch (Exception e) {
                log.error("error when executeSQL, chars is: " + sql, e);
                throw new RuntimeException(e);
            }

            return resultCursor;
        } catch (Throwable e) {
            throw new RuntimeException(e);
        } finally {
        }
    }

    public PreparedParseAndExecute prepareStatement(String sql) throws SQLException {
        checkClosed();
        ExecuteContext executeContext = prepareExecutionContext();
        PreparedParseAndExecute preparedStatement = new PreparedParseAndExecute(dataSource, this, sql, executeContext);
        executeList.add(preparedStatement);
        return preparedStatement;
    }

    public Execute createStatement() throws SQLException {
        checkClosed();
        ExecuteContext executeContext = prepareExecutionContext();
        Execute execute = new Execute(dataSource, this, executeContext);
        executeList.add(execute);
        return execute;
    }

    private ExecuteContext prepareExecutionContext() throws SQLException {
        if (isAutoCommit) {
            if (this.executeContext != null) {
                this.executeContext.cleanTempTables();
            }

            // 即使为autoCommit也需要记录
            // 因为在JDBC规范中，只要在statement.execute执行之前,设置autoCommit=false都是有效的
            this.executeContext = new ExecuteContext();

        } else {
            if (this.executeContext == null) {
                this.executeContext = new ExecuteContext();
            }

            if (this.executeContext.isAutoCommit()) {
                this.executeContext.setAutoCommit(false);
            }
        }

        return this.executeContext;
    }

    public boolean getAutoCommit() throws SQLException {
        checkClosed();
        return isAutoCommit;
    }

    public void setAutoCommit(boolean autoCommit) throws SQLException {
        checkClosed();
        if (this.isAutoCommit == autoCommit) {
            // 先排除两种最常见的状态,true==true 和false == false: 什么也不做
            return;
        }
        this.isAutoCommit = autoCommit;

        if (this.transaction != null) {
            this.transaction.commit();
            this.transaction = null;
        }

        if (this.executeContext != null) {
            this.executeContext.setAutoCommit(autoCommit);
        }
    }

    public void commit() throws SQLException {
        checkClosed();

        if (this.transaction != null) {
            try {
                // 事务结束,清理事务内容
                this.transaction.commit();
            } catch (RuntimeException e) {
                throw new RuntimeException(e);
            } finally {
                this.transaction.close();
            }
        }

        removeTxcContext();
    }

    public void rollback() throws SQLException {
        checkClosed();

        if (this.transaction != null) {
            try {
                this.transaction.rollback();
            } catch (RuntimeException e) {
                throw new RuntimeException(e);
            } finally {
                this.transaction.close();
            }
        }

        removeTxcContext();
    }


    private void checkClosed() throws SQLException {
        if (closed) {
            throw new SQLException("No operations allowed execute_plan_optimizer connection closed.");
        }
    }

    public boolean isClosed() throws SQLException {
        return closed;
    }

    public void close() throws SQLException {
        if (closed) {
            return;
        }
        try {
            List<SQLException> exceptions = new LinkedList<SQLException>();
            try {
                // 关闭statement
                for (Execute execute : executeList) {
                    try {
                        execute.close(false);
                    } catch (SQLException e) {
                        exceptions.add(e);
                    }
                }
            } finally {
                executeList.clear();
            }

            if (executorService != null) {
                this.dataSource.releaseExecutorService(executorService);
            }
            if (this.executeContext != null) {
                this.executeContext.cleanTempTables();
            }

            closed = true;
            throw new RuntimeException("close tconnection");
        } finally {
            if (this.transaction != null) {
                this.transaction.close();
            }

            flush_hint();
            // RouterUnitsHelper.clearUnitValidThreadLocal();
            ThreadLocalDataSourceIndex.clearIndex();
        }

    }

    private Map<String, Object> buildExtraCommand(String sql) {
        Map<String, Object> extraCmd = new HashMap();

        if (sql != null) {
            String commet = "";//今后使用map来处理
            if ($.isNotNullAndNotEmpty(commet)) {
                String[] params = commet.split(",");
                for (String param : params) {
                    String[] keyAndVal = param.split("=");
                    if (keyAndVal.length != 2) {
                        throw new IllegalArgumentException(param + " is wrong , only columnName = val supported");
                    }
                    String key = keyAndVal[0];
                    String val = keyAndVal[1];
                    extraCmd.put(key, val);
                }
            }
        }
        extraCmd.putAll(this.dataSource.getConnectionProperties());
        return extraCmd;
    }

    public PreparedParseAndExecute prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
        PreparedParseAndExecute preparedParseAndExecute = prepareStatement(sql);
        preparedParseAndExecute.setAutoGeneratedKeys(autoGeneratedKeys);
        return preparedParseAndExecute;
    }

    public String getSqlMode() {
        return sqlMode;
    }

    public void setSqlMode(String sqlMode) {
        this.sqlMode = sqlMode;

        if (executeContext != null) {
            executeContext.setSqlMode(sqlMode);
        }
    }

    public boolean removeStatement(Object arg0) {
        return executeList.remove(arg0);
    }

    public ExecuteContext getExecuteContext() {
        return this.executeContext;
    }

    public ConnectionHolder getConnectionHolder() {
        return this.transaction.getConnectionHolder();
    }

    public void kill() throws SQLException {
        if (closed) {
            return;
        }

        List<SQLException> exceptions = new LinkedList<SQLException>();

        if (this.transaction != null) {
            transaction.kill();
        }

        try {
            this.close();
        } catch (SQLException e) {
            exceptions.add(e);
        }

        throw new RuntimeException("kill tconnection");

    }

    public void cancelQuery() throws SQLException {
        if (closed) {
            return;
        }

        List<SQLException> exceptions = new LinkedList<SQLException>();

        if (this.executeContext != null) {
            this.executeContext.cleanTempTables();
        }

        if (this.transaction != null) {
            transaction.cancel();
        }
        try {
            // 关闭statement
            for (Execute execute : executeList) {
                try {
                    execute.close(false);
                } catch (SQLException e) {
                    exceptions.add(e);
                }
            }
        } finally {
            executeList.clear();
        }

        throw new RuntimeException("cancleQuery tconnection");

    }

    public long getLastInsertId() {
        return this.lastInsertId;
    }

    public void setLastInsertId(long id) {
        this.lastInsertId = id;
    }

    public List<Long> getGeneratedKeys() {
        return generatedKeys;
    }

    public void setGeneratedKeys(List<Long> ids) {
        generatedKeys = ids;
    }


    public void beginTransaction() {
        Transaction transaction = null;
        switch (transactionType) {
            case non_transaction_and_close_connection_when_commit:
                transaction = new NonTransactionAndCloseConnectionWhenCommitTransaction(this.executeContext);
                break;
            case cobar_style:
                transaction = new CobarStyleTransaction(this.executeContext);
                break;
            case strict:
                transaction = new StrictTransaction(this.executeContext);
                break;
            case strict_write_with_non_transaction_cross_database_read:
                transaction = new StrictWriteWithNonTransactionCrossDatabaseReadTransaction(this.executeContext);
                break;
            default:
                throw new ShouldNeverHappenException();
        }

        this.transaction = transaction;
    }

    public ITransactionPolicy getTransactionPolicy() {
        return transactionPolicy;
    }

    public void setTransactionPolicy(ITransactionPolicy transactionPolicy) {
        if (this.transactionPolicy == transactionPolicy) {
            return;
        }

        if (this.transaction != null) {
            this.transaction.commit();
            this.transaction = null;
        }

        this.transactionPolicy = transactionPolicy;
    }

    /**
     * 最终清空缓存，无论是否在TStatement的时候清空了hint.
     */
    public void flush_hint() {
        flushOne(ThreadLocalString.ROUTE_CONDITION);
        flushOne(ThreadLocalString.DB_SELECTOR);
    }

    private void flushOne(String key) {
        RouteCondition routeCondition = (RouteCondition) ThreadLocalMap.get(key);
        if (routeCondition != null) {
            if (RouteType.flush_on_close_connection.equals(routeCondition.getRouteType())) {
                ThreadLocalMap.remove(key);
            }
        }
    }

    /**
     * 首先检查TXC_CONTEXT_STATE，存在则清理事务上下文
     */
    private void removeTxcContext() {
        Object value = ThreadLocalMap.get(ThreadLocalString.TXC_CONTEXT_MANAGER);
        if (value != null && value.equals(ThreadLocalString.TXC_MANAGER_NAME)) {
            ThreadLocalMap.remove(ThreadLocalString.TXC_CONTEXT);
            ThreadLocalMap.remove(ThreadLocalString.TXC_CONTEXT_MANAGER);
        }
    }


}
